package org.luosl.webmagicx.monitor

import java.util
import java.util.concurrent.atomic.AtomicInteger

import org.luosl.webmagicx.SpiderCell
import org.luosl.webmagicx.utils.Logging
import org.luosl.webmagicx.conf.{SpiderConf, Task}
import org.quartz._
import org.quartz.impl.StdSchedulerFactory
import us.codecraft.webmagic.Spider.Status

import scala.collection.JavaConverters._

/**
  * Created by luosl on 2017/12/13.
  */
class TaskMonitor {

  private val schedulerFactory:StdSchedulerFactory = new StdSchedulerFactory()
  private val scheduler:Scheduler = schedulerFactory.getScheduler()
  private val jobSequence:AtomicInteger = new AtomicInteger(1)
  private val triggerSequence:AtomicInteger = new AtomicInteger(1)
  private val registerSpiders:util.HashMap[String,(SpiderCell, List[(JobKey, TriggerKey)])] =
    new util.HashMap[String,(SpiderCell, List[(JobKey, TriggerKey)])]()

  /**
    * 提交一个任务
    * @param spiderCell spiderCell
    */
  def submitSpiderTask(spiderCell: SpiderCell): Unit ={
    if(registerSpiders.containsKey(spiderCell.id)){
      throw new RuntimeException(s"spider任务[id=${spiderCell.id}]已经存在")
    }
    val spiderConf:SpiderConf = spiderCell.sc
    val keys:List[(JobKey, TriggerKey)] = spiderConf.task match {
      case Some(Task(startNow, corn)) =>
        val ls1:List[(JobKey, TriggerKey)] = List(subCornJob(spiderCell, corn))
        // 是否立即启动
        val ls2:List[(JobKey, TriggerKey)] = if(startNow) List(subStartNowJob(spiderCell)) else List.empty
        ls1 ::: ls2
      case None => List(subStartNowJob(spiderCell))
    }
    registerSpiders.put(spiderCell.id, spiderCell -> keys)
  }

  /**
    * 提价一个立即启动的job
    * @param spiderCell spiderCell
    */
  private def subStartNowJob(spiderCell: SpiderCell): (JobKey, TriggerKey) ={
    val job:JobDetail = buildJob(spiderCell,s"spider-${jobSequence.getAndIncrement()}")
    val cornTrigger = TriggerBuilder.newTrigger()
      .withIdentity(s"trigger-${triggerSequence.getAndIncrement()}", "group1")
      .build()
    scheduler.scheduleJob(job,cornTrigger)
    (job.getKey, cornTrigger.getKey)
  }

  /**
    * 提交一个 corn job
    * @param spiderCell job
    * @param corn corn
    */
  private def subCornJob(spiderCell: SpiderCell,corn:String): (JobKey, TriggerKey) ={
    val job:JobDetail = buildJob(spiderCell,s"spider-${jobSequence.getAndIncrement()}")
    val cornTrigger = TriggerBuilder.newTrigger()
      .withIdentity(s"trigger-${triggerSequence.getAndIncrement()}", "group1")
      .withSchedule(CronScheduleBuilder.cronSchedule(corn))
      .build()
    scheduler.scheduleJob(job,cornTrigger)
    (job.getKey, cornTrigger.getKey)
  }

  /**
    * 构建一个 job
    * @param spiderCell spiderCell
    * @param jobId jobId
    * @return
    */
  private def buildJob(spiderCell: SpiderCell, jobId:String):JobDetail = {
    val jobData:JobDataMap = new JobDataMap(Map("spiderCell" -> spiderCell).asJava)
    JobBuilder.newJob(classOf[StartSpiderJob])
      .withIdentity(jobId, "group1")
      .setJobData(jobData)
      .build()
  }

  def remove(id:String): Unit ={
    registerSpiders.get(id) match {
      case null => throw new RuntimeException(s"无效的spiderId:$id")
      case (_, keys) =>  keys.foreach{
        case (jobKey, triggerKey) =>
          scheduler.deleteJob(jobKey)
          scheduler.unscheduleJob(triggerKey)
          registerSpiders.remove(id)
        case _ => Unit
      }
    }
  }

  /**
    * 终止定时任务服务
    */
  def shutdown(): Unit ={
    registerSpiders.values().asScala.foreach(_._1.spider.stop())
    scheduler.shutdown()
  }

  def start(): Unit = scheduler.start()

  /**
    * 当前正在运行的 spider 任务数目
    * @return
    */
  def runTaskNumber():Int = {
    val onceNum:Int = registerSpiders.values().asScala.map(_._1)
      .count(cell=> cell.sc.task.isEmpty && (cell.status() == Status.Running || cell.status() == Status.Init))
    val cornNum:Int = registerSpiders.values().asScala.count(cell=> cell._1.sc.task.isDefined)
    onceNum + cornNum
  }

}

case class JobCell(jobIds:Seq[String], triggerIds:Seq[String], spiderCell: SpiderCell)

class StartSpiderJob extends Job with Logging{
  override def execute(context: JobExecutionContext): Unit = {
    val spiderCell:SpiderCell = context.getJobDetail.getJobDataMap.get("spiderCell").asInstanceOf[SpiderCell]
    if(spiderCell.status() != Status.Running){
      spiderCell.restart()
      logInfo(s"spider[id=${spiderCell.id}]启动成功...")
    }else{
      logInfo(s"spider[id=${spiderCell.id}]正在运行...")
    }
  }
}
